<?php
namespace Swork\Process;

use Swoole\Process;
use Swork\Bean\Holder\InstanceHolder;
use Swork\Bean\Scanner;
use Swork\Configer;
use Swork\Initialize;
use Swork\Pool\Amqp\AmqpConfig;
use Swork\Service;

/**
 * 用户进程
 * Class UserProcess
 * @package Swork\Process
 */
class UserProcess
{
    /**
     * 用户进程数量
     * @var int
     */
    private $nums = 0;

    /**
     * 当前服务对象
     * @var \swoole_websocket_server
     */
    private $server;

    /**
     * 初始化用户进程
     * @param \swoole_websocket_server $server
     */
    public function init(\swoole_websocket_server $server)
    {
        $this->server = $server;

        //收集注解
        $scanner = new Scanner(Service::$env, 0);
        $scanner->collect();

        //增加AmqpConsumeProcess
        //根据配置实现消费队列是单一线程，还是多线程（每1个队列名1个线程）
        $this->nums = 0;
        $amqpConfig = new AmqpConfig(Configer::get('amqp'), 'default');
        if ($amqpConfig->isEnable())
        {
            $tasks = $scanner->getQueueTask();
            if ($amqpConfig->isThreads())
            {
                foreach ($tasks as $cls => $items)
                {
                    foreach ($items as $name => $args)
                    {
                        $threads = $args['threads'];
                        for ($i = 0; $i < $threads; $i++)
                        {
                            $holder = [
                                $cls => [$name => $args]
                            ];
                            $this->addProcess(QueueTaskProcess::class, $holder);
                            $this->nums += 1;
                        }
                    }
                }
            }
            else
            {
                if (count($tasks) > 0)
                {
                    $this->addProcess(QueueTaskProcess::class, $tasks);
                    $this->nums += 1;
                }
            }
        }

        //获取收集到的其它子线程Holder
        $holder = $scanner->getProcess();
        $this->nums += count($holder);
        foreach ($holder as $cls => $exts)
        {
            $this->addProcess($cls, $exts);
        }
    }

    /**
     * 等待子进程结束运行
     */
    public function waitToExit()
    {
        if ($this->nums > 0)
        {
            \swoole_process::wait(true);
        }
    }

    /**
     * 退出程程
     */
    public static function closeToExit()
    {
        Process::signal(SIGTERM, function ($signo) {
            echo 'Process Close to Exist' . PHP_EOL;
        });
    }

    /**
     * 增加用户进程
     * @param string $cls
     * @param array $exts
     */
    private function addProcess(string $cls, array $exts)
    {
        //初始化用户进程
        $process = new Process(function (\swoole_process $worker) use ($cls, $exts) {

            //收集注解
            $inti = new Initialize();
            $inti->collect($worker->pid);

            //初始化内置对象
            $inti->db();
            $inti->redis();
            $inti->rpc();
            $inti->amqp();
            $inti->ElasticSearch();

            //回调
            $inc = InstanceHolder::getClass($cls);
            if (!($inc instanceof ProcessHandlerInterface))
            {
                return;
            }
            $inc->run($this->server, $worker, $exts);
        });

        //加入主服务
        Service::$server->addProcess($process);
    }
}
